package cn.doitedu

import ch.hsr.geohash.GeoHash
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSession

import java.util.Properties

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2022/12/9
 * @Desc:
 *    程序运行前提：
 *     1. 在mysql中要有 t_md_areas 表
 *     2. 在hbase中要创建好 dim_geo_area 表：  hbase(main):003:0> create 'dim_geo_area','f'
 *    本程序的核心知识点： spark中如何把rdd数据写入hbase
 * */
object GeoHashDimTableBuilder {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("geohash码地域维表构建任务")
      .config("spark.sql.shuffle.partitions", 2)
      .getOrCreate()

    val sc = spark.sparkContext

    // hbase 目标表名
    val tablename = "dim_geo_area"

    // hbase集群所连的zookeeper配置信息
    sc.hadoopConfiguration.set("hbase.zookeeper.quorum","doitedu")
    sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
    // 为sparkContext设置outputformat为hbase的TableOutputFormat
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)

    // 封装mapreduce的Job配置信息
    val job = Job.getInstance(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    // 加载mysql中的原始地理位置信息数据表
    val props = new Properties()
    props.setProperty("user","root")
    props.setProperty("password","root")

    val df = spark.read.jdbc("jdbc:mysql://doitedu:3306/realtimedw", "t_md_areas", props)
    df.createTempView("t")

    val gps2GeoHashcode = (lat:Double, lng:Double)=> GeoHash.geoHashStringWithCharacterPrecision(lat,lng,5)
    spark.udf.register("geo",gps2GeoHashcode)

    // 加工地理位置参考坐标点字典数据
    val resDf = spark.sql(
      """
        |select
        |   geohash,
        |   province,
        |   city,
        |   nvl(region,'') as region
        |from(
        |   select
        |      geohash,
        |      province,
        |      city,
        |      region,
        |      -- 利用row_number()over() 对 相同重复的数据 进行去重
        |      row_number() over(partition by geohash order by province) as rn
        |   from
        |   (
        |      -- 对原始地理位置表，进行自关联，将层级数据扁平化
        |      SELECT
        |        geo(lv4.BD09_LAT, lv4.BD09_LNG) as geohash,
        |        lv1.AREANAME as province,
        |        lv2.AREANAME as city,
        |        lv3.AREANAME as region
        |      from t lv4
        |        join t lv3 on lv4.`LEVEL`=4 and lv4.bd09_lat is not null and lv4.bd09_lng is not null and lv4.PARENTID = lv3.ID
        |        join t lv2 on lv3.PARENTID = lv2.ID
        |        join t lv1 on lv2.PARENTID = lv1.ID
        |   ) o1
        |)o2
        |
        |where rn=1 and geohash is not null and length(trim(geohash))=5
        |and province is not null and trim(province)!=''
        |and city is not null and trim(city)!=''
        |
        |""".stripMargin)


    /*resDf.where("geohash='wwcmf'").show()
    sys.exit(1)*/


    // 将上面整理好的结果：  geohash码，省，市，区
    // 转换成TableOutputFormat所需要的  (ImmutableBytesWritable,Put) 结构
    val resRdd = resDf.rdd.map(row => {
      val geoHash = row.getAs[String]("geohash")
      val province = row.getAs[String]("province")
      val city = row.getAs[String]("city")
      val region = row.getAs[String]("region")

      // 用geohash码做rowkey
      // 反转geohash码，以避免热点问题
      val put = new Put(Bytes.toBytes(geoHash.reverse))
      put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("p"), Bytes.toBytes(province))
      put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(city))
      put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("r"), Bytes.toBytes(region))

      (new ImmutableBytesWritable(), put)

    })


    resRdd.saveAsNewAPIHadoopDataset(job.getConfiguration())

    spark.close()
  }
}
